package cn.tenmg.flink.jobs.launcher.quickstart.servlet;

import java.io.IOException;
import java.io.PrintWriter;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import cn.tenmg.flink.jobs.FlinkJobsLauncher;
import cn.tenmg.flink.jobs.FlinkJobsLauncher.FlinkJobsInfo;
import cn.tenmg.flink.jobs.config.loader.XMLConfigLoader;
import cn.tenmg.flink.jobs.config.model.FlinkJobs;
import cn.tenmg.flink.jobs.launcher.RestClusterClientFlinkJobsLauncher;
import cn.tenmg.flink.jobs.launcher.quickstart.model.Response;
import cn.tenmg.flink.jobs.launcher.quickstart.utils.AppUtils;
import cn.tenmg.flink.jobs.launcher.quickstart.utils.HttpUtils;
import cn.tenmg.flink.jobs.launcher.utils.HttpClientUtils;

/**
 * flink-jobs任务调度请求处理器
 * 
 * @author June wjzhao@aliyun.com
 *
 */
public class FlinkJobsServlet extends HttpServlet {

	/**
	 * 
	 */
	private static final long serialVersionUID = 121370836842222472L;

	private static final FlinkJobsLauncher flinkJobsLauncher = new RestClusterClientFlinkJobsLauncher();

	@Override
	protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
		String uri = req.getRequestURI();
		if (uri.endsWith("/start")) {// 启动任务
			start(req, resp);
		} else if (uri.endsWith("/stop")) {// 停止任务
			stop(req, resp);
		} else {// 获取任务信息
			info(req, resp);
		}
	}

	private void start(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
		try {
			JSONObject json = JSON.parseObject(HttpUtils.getRequestBody(req));
			FlinkJobs flinkJobs = XMLConfigLoader.getInstance().load(json.getString("flink-jobs"));// 解析XML配置
			PrintWriter pw = resp.getWriter();
			try {
				FlinkJobsInfo appInfo = flinkJobsLauncher.launch(flinkJobs);
				HttpUtils.response(resp, Response.success(true).data(appInfo));
			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				pw.flush();
				pw.close();
			}
		} catch (Exception e) {
			e.printStackTrace();
			HttpUtils.response(resp, Response.success(false).message("请正确填写配置内容"));
		}
	}

	private void stop(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
		JSONObject json = JSON.parseObject(HttpUtils.getRequestBody(req));
		try {
			String savepoint = flinkJobsLauncher.stop(json.getString("jobId"));
			HttpUtils.response(resp, Response.success(true).message("保存点：" + savepoint));
		} catch (Exception e) {
			HttpUtils.response(resp, Response.success(false).message(e.getMessage()));
		}
	}

	private void info(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
		JSONObject json = JSON.parseObject(HttpUtils.getRequestBody(req));
		String flinkRest = AppUtils.getFlinkRest();
		try {
			String jobId = json.getString("jobId"),
					url = flinkRest + (flinkRest.endsWith("/") ? "" : "/" + "jobs/") + jobId;
			json = JSON.parseObject(HttpClientUtils.get(url));
			String state = json.getString("state"), message = null;
			if ("FAILED".equals(state)) {// 运行发生错误，获取异常信息
				String exceptions = HttpClientUtils.get(url + jobId + "/exceptions");
				if (exceptions != null) {
					json = JSON.parseObject(exceptions);
					if (json.containsKey("root-exception")) {
						message = json.getString("root-exception");
					}
				}
			}
			Info info = new Info();
			info.setState(state);
			info.setMessage(message);
			HttpUtils.response(resp, Response.success(true).data(info));
		} catch (Exception e) {
			HttpUtils.response(resp, Response.success(false).message(e.getMessage()));
		}
	}

	public static class Info {

		private String state;

		private String message;

		public String getState() {
			return state;
		}

		public void setState(String state) {
			this.state = state;
		}

		public String getMessage() {
			return message;
		}

		public void setMessage(String message) {
			this.message = message;
		}
	}
}
